Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Dec 2, 2025

What changes were proposed in this pull request?

SupportsPushDownVariants was a Scan mix-in in #52578. This patch changes it to be a ScanBuilder mix-in to follow the established patterns in the codebase.

Why are the changes needed?

SupportsPushDownVariants was a Scan mix-in in #52578. This patch changes it to be a ScanBuilder mix-in to follow the established patterns in the codebase, e.g, join pushdown, aggregate pushdown...etc.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code v2.0.14

@aokolnychyi
Copy link
Contributor

I personally would want the following changes to VARIANT pushdown in DSv2:

  1. Move the logic to ScanBuilder instead of Scan (this PR attempts to do exactly that).
  2. Evolve the connector API.
  • Rename interfaces to SupportsPushDownVariantExtractions / VariantExtraction (alternatives are welcome).
  • Pass each variant_get expression as separate VariantExtraction so that connectors can check each field easily.
  • Return boolean[] from pushVariantExtractions to indicate what was pushed.
interface SupportsPushDownVariantExtractions extends ScanBuilder {
  boolean[] pushVariantExtractions(VariantExtraction[] extractions);
}

interface VariantExtraction {
  String[] columnName; // variant column name
  String path; // extraction path from variant_get and try_variant_get
  DataType expectedDataType; // expected data type
}
  1. Clearly state that connectors must only push down an extraction if the data has been shredded before. Connectors should not try to attempt to cast / extract on demand. It must be done in Spark if the data hasn't been shredded prior to the scan.

Basically consider the following example:

  SELECT
    variant_get(v, '$.data[1].a', 'string'),
    variant_get(v, '$.key', 'int'),
    variant_get(s.v2, '$.x', 'double')
  FROM tbl;

This should pass the following extractions to connector:

  VariantExtraction[] extractions = [
    new VariantExtraction(["v"], "$.data[1].a", StringType),
    new VariantExtraction(["v"], "$.key", IntegerType),
    new VariantExtraction(["s", "v2"], "$.x", DoubleType)  // ← nested in struct 's'
  ];

Connectors mark VariantExtraction as pushed ONLY if they can guarantee that ALL records satisfy the expected type, meaning the data has been shredded prior to the scan.

@aokolnychyi
Copy link
Contributor

@viirya
Copy link
Member Author

viirya commented Dec 4, 2025

Pass each variant_get expression as separate VariantExtraction so that connectors can check each field easily.
interface VariantExtraction {
String[] columnName; // variant column name
String path; // extraction path from variant_get and try_variant_get
DataType expectedDataType; // expected data type
}

Currently VariantAccessInfo represents an access to a variant column. So it has a member String columnName. What does a VariantExtraction represent for?

Although you said "each variant_get expression as separate VariantExtraction", if there are multiple variant_gets for same variant column, you mean to have multiple VariantExtractions? Currently they are all represented by one VariantAccessInfo for the variant column, I think it makes more sense.

@viirya
Copy link
Member Author

viirya commented Dec 4, 2025

Connectors mark VariantExtraction as pushed ONLY if they can guarantee that ALL records satisfy the expected type, meaning the data has been shredded prior to the scan.

I think connectors still can read and parse the variant to required type even it is not a shredded variant. From the view of Spark and DSv2 API, we don't need to know how the connectors fulfill the pushdown requirement.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Dec 4, 2025

Currently VariantAccessInfo represents an access to a variant column. So it has a member String columnName. What does a VariantExtraction represent for?

Although you said "each variant_get expression as separate VariantExtraction", if there are multiple variant_gets for same variant column, you mean to have multiple VariantExtractions? Currently they are all represented by one VariantAccessInfo for the variant column, I think it makes more sense.

I expect each variant_get and try_variant_get to be converted to VariantExtraction with variant column name parts and extraction JSON path. If a connector has shredded 2 out 3 requested columns, it can simply mark with booleans what it supports and what must be done in Spark. If we use VariantAccessInfo, then they would have to create a new StructType to indicate what they can extract? Seems very complicated and error-prone compared to returning booleans.

I think connectors still can read and parse the variant to required type even it is not a shredded variant. From the view of Spark and DSv2 API, we don't need to know how the connectors fulfill the pushdown requirement.

I feel this is VERY dangerous. I read through the casting logic in Spark. It has so many edge cases. There is no way connectors will replicate this behavior. We don't want to have inconsistent extraction between connectors. In the future, we may add a casting function to VariantExtraction that Spark would provide. That said, I would not do it now.

@viirya
Copy link
Member Author

viirya commented Dec 4, 2025

Okay, I'm updating this PR according to the ideas. I will update this soon.

@aokolnychyi
Copy link
Contributor

Does the latest proposal make sense to you too, @cloud-fan @gengliangwang @dongjoon-hyun? Any thoughts?

@dongjoon-hyun
Copy link
Member

It would be great if we can see the tangible code to review, @aokolnychyi .

Could you update the PR if it's ready, @viirya ?

@viirya
Copy link
Member Author

viirya commented Dec 5, 2025

Could you update the PR if it's ready, @viirya ?

I will update this once it is ready.

@aokolnychyi
Copy link
Contributor

One more question I forgot. Do we need to make VariantExtraction extend Serializable? Is it supposed to be sent to executors? Seems like this will be done on the driver.

@dongjoon-hyun
Copy link
Member

Given the current status, I made a PR to branch-4.1 to mark this interface as Experimental for Apache Spark 4.1.0 only in order to be safe.

dongjoon-hyun added a commit that referenced this pull request Dec 6, 2025
…tal`

### What changes were proposed in this pull request?

This PR aims to mark `SupportsPushDownVariants` as `Experimental` instead of `Evolving` in Apache Spark 4.1.x.

### Why are the changes needed?

During Apache Spark 4.1.0 RC2, it turns out that this new `Variant` improvement feature still needs more time to stabilize.

- #52522
- #52578
- #53276
- [[VOTE] Release Spark 4.1.0 (RC2)](https://lists.apache.org/thread/og4dn0g7r92qj22fdsmqoqs518k324q5)

We had better mark this interface itself as `Experimental` in Apache Spark 4.1.0 while keeping it `Evolving` in `master` branch.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #53354 from dongjoon-hyun/SPARK-54616.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@viirya
Copy link
Member Author

viirya commented Dec 8, 2025

One more question I forgot. Do we need to make VariantExtraction extend Serializable? Is it supposed to be sent to executors? Seems like this will be done on the driver.

Hmm, for the built-in ParquetScan, it is not required because it doesn't send VariantExtraction to executors. It uses the extraction info to transform schema and the physical reader uses the schema to do variant rewriting.

But probably it is required to be Serializable for third-party datasource implementions as we don't know how they will use the extraction info. They may send the info to executors and do rewriting there.

@dongjoon-hyun
Copy link
Member

Thank you for updating the PR, @viirya .

* @since 4.1.0
*/
@Experimental
public final class VariantExtractionImpl implements VariantExtraction, Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be public API?

Copy link
Member Author

@viirya viirya Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be internal, but means every datasource implementions need to implement their VariantExtraction. I think we can provide one implementation?

case nestedStruct: StructType =>
field.name +: getColumnName(nestedStruct, rest)
case _ =>
throw new IllegalArgumentException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should throw SparkException.internal

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean SparkException.internalError, right?

* Pushes down variant field extractions to the data source.
* <p>
* Each element in the input array represents one field extraction operation from a variant
* column. Data sources should examine each extraction and determine whether it can be
Copy link
Contributor

@cloud-fan cloud-fan Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark do any reconciliation? e.g. variant_get(v, '$.a', 'int') and variant_get(v, '$.b', 'int') will be two extractions, but how about variant_get(v, '$.a', 'struct<x int, y int>') and variant_get(v, '$.a.x', 'int')? Are they two extractions as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are two extractions. Each separate get to a variant will be an extraction.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the PR title with a valid JIRA ID, @viirya .

@viirya viirya changed the title [SPARK-XXXXX][SQL] Refactor SupportsPushDownVariants to be a ScanBuilder mix-in [SPARK-54656][SQL] Refactor SupportsPushDownVariants to be a ScanBuilder mix-in Dec 9, 2025
@viirya
Copy link
Member Author

viirya commented Dec 9, 2025

Please update the PR title with a valid JIRA ID, @viirya .

Thank you @dongjoon-hyun. I updated it.

@dongjoon-hyun
Copy link
Member

Thank you so much, @viirya .

* @since 4.1.0
*/
@Experimental
public final class VariantExtractionImpl implements VariantExtraction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the value of separating the interface and imp if we need to expose the impl anyway. Can we follow TableInfo and make VariantExtraction a class directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but let me confirm again: the API means Spark passes VariantExtraction instances to the data source, and data sources need to return bool. Why do data sources need to use the impl class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you're right. We don't need to expose VariantExtractionImpl but only the interface.

* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I moved it there and make it Scala file instead of Java file now.

require(expectedDataType != null, "expectedDataType cannot be null")
require(columnName.nonEmpty, "columnName cannot be empty")

override def equals(obj: Any): Boolean = obj match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think scala case class already implemented these basic methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Thanks.

@dongjoon-hyun
Copy link
Member

Do you want to address the last Wenchen's comment, @viirya ?

@dongjoon-hyun dongjoon-hyun self-assigned this Dec 10, 2025
@viirya
Copy link
Member Author

viirya commented Dec 10, 2025

Do you want to address the last Wenchen's comment, @viirya ?

Yea, addressed it. Thanks.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @viirya , @cloud-fan , @gengliangwang .

Pending CIs.

@dongjoon-hyun
Copy link
Member

Could you remove unused import too, @viirya ?

Error: ] /home/runner/work/spark-1/spark-1/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/VariantExtractionImpl.scala:20: Unused import
Applicable -Wconf / @nowarn filters for this fatal warning: msg=<part of the message>, cat=unused-imports, site=org.apache.spark.sql.internal.connector

@dongjoon-hyun
Copy link
Member

I removed it, @viirya .

@viirya
Copy link
Member Author

viirya commented Dec 10, 2025

I removed it, @viirya .

Okay. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants